package org.codehaus.activemq.ra;

import java.util.HashMap;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.XAConnection;
import javax.jms.XASession;
import javax.resource.NotSupportedException;
import javax.resource.ResourceException;
import javax.resource.spi.ActivationSpec;
import javax.resource.spi.BootstrapContext;
import javax.resource.spi.ResourceAdapter;
import javax.resource.spi.ResourceAdapterInternalException;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.transaction.xa.XAResource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.ActiveMQConnectionFactory;
import org.codehaus.activemq.ActiveMQXAConnectionFactory;

public class ActiveMQResourceAdapter
  implements ResourceAdapter
{
  private static final Log log = LogFactory.getLog(ActiveMQResourceAdapter.class);
  private static final String ASF_ENDPOINT_WORKER_TYPE = "asf";
  private static final String POLLING_ENDPOINT_WORKER_TYPE = "polling";
  private BootstrapContext bootstrapContext;
  private HashMap endpointWorkers = new HashMap();
  private final ActiveMQConnectionRequestInfo info = new ActiveMQConnectionRequestInfo();
  private String endpointWorkerType = "polling";
  private Connection physicalConnection;
  private ConnectionFactory connectionFactory;

  public ActiveMQResourceAdapter()
  {
    this(null);
  }

  public ActiveMQResourceAdapter(ConnectionFactory connectionFactory) {
    this.connectionFactory = connectionFactory;
  }

  public void start(BootstrapContext bootstrapContext)
    throws ResourceAdapterInternalException
  {
    this.bootstrapContext = bootstrapContext;
    try
    {
      this.physicalConnection = makeConnection(this.connectionFactory, this.info);
      this.physicalConnection.start();
    }
    catch (JMSException e) {
      throw new ResourceAdapterInternalException("Could not establish a connection to the server.", e);
    }
  }

  public static Connection makeConnection(ConnectionFactory connectionFactory, ActiveMQConnectionRequestInfo info)
    throws JMSException
  {
    if (connectionFactory == null) {
      if (info.isXa()) {
        connectionFactory = new ActiveMQXAConnectionFactory(info.getServerUrl());
      }
      else {
        connectionFactory = new ActiveMQConnectionFactory(info.getServerUrl());
      }
    }
    Connection physicalConnection = connectionFactory.createConnection(info.getUserName(), info.getPassword());
    if (info.getClientid() != null) {
      physicalConnection.setClientID(info.getClientid());
    }
    return physicalConnection;
  }

  public Connection getPhysicalConnection()
  {
    return this.physicalConnection;
  }

  public void stop()
  {
    this.bootstrapContext = null;
    if (this.physicalConnection != null)
      try {
        this.physicalConnection.close();
        this.physicalConnection = null;
      }
      catch (JMSException e) {
        log.debug("Error occured during ResourceAdapter stop: ", e);
      }
  }

  public BootstrapContext getBootstrapContext()
  {
    return this.bootstrapContext;
  }

  public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec)
    throws ResourceException
  {
    if (activationSpec.getResourceAdapter() != this) {
      throw new ResourceException("Activation spec not initialized with this ResourceAdapter instance");
    }

    if (activationSpec.getClass().equals(ActiveMQActivationSpec.class))
    {
      ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (ActiveMQActivationSpec)activationSpec);

      if (this.endpointWorkers.containsKey(key))
        throw new IllegalStateException("Endpoint previously activated");
      ActiveMQBaseEndpointWorker worker;
   //   ActiveMQBaseEndpointWorker worker;
      if ("polling".equals(getEndpointWorkerType()))
        worker = new ActiveMQPollingEndpointWorker(this, key);
      else if ("asf".equals(getEndpointWorkerType()))
        worker = new ActiveMQAsfEndpointWorker(this, key);
      else {
        throw new NotSupportedException("That type of EndpointWorkerType is not supported: " + getEndpointWorkerType());
      }

      this.endpointWorkers.put(key, worker);
      worker.start();
    }
    else
    {
      throw new NotSupportedException("That type of ActicationSpec not supported: " + activationSpec.getClass());
    }
  }

  public void endpointDeactivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec)
  {
    if (activationSpec.getClass().equals(ActiveMQActivationSpec.class)) {
      ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (ActiveMQActivationSpec)activationSpec);
      ActiveMQBaseEndpointWorker worker = (ActiveMQBaseEndpointWorker)this.endpointWorkers.get(key);
      if (worker == null)
      {
        return;
      }
      try {
        worker.stop();
      }
      catch (InterruptedException e)
      {
        Thread.currentThread().interrupt();
      }
    }
  }

  public XAResource[] getXAResources(ActivationSpec[] activationSpecs)
    throws ResourceException
  {
    try
    {
      Connection connection = getPhysicalConnection();
      if ((connection instanceof XAConnection)) {
        XASession session = ((XAConnection)connection).createXASession();
        XAResource xaResource = session.getXAResource();
        return new XAResource[] { xaResource };
      }
      return new XAResource[0];
    } catch (JMSException e) {
    	throw new ResourceException(e);
    }
    
  }

  public String getClientid()
  {
    return this.info.getClientid();
  }

  public String getPassword()
  {
    return this.info.getPassword();
  }

  public String getServerUrl()
  {
    return this.info.getServerUrl();
  }

  public String getUserName()
  {
    return this.info.getUserName();
  }

  public void setClientid(String clientid)
  {
    this.info.setClientid(clientid);
  }

  public void setPassword(String password)
  {
    this.info.setPassword(password);
  }

  public void setServerUrl(String url)
  {
    this.info.setServerUrl(url);
  }

  public void setUserName(String userid)
  {
    this.info.setUserName(userid);
  }

  public Boolean isXA() {
    return Boolean.valueOf(this.info.isXa());
  }

  public void setXA(Boolean xa) {
    this.info.setXa(xa.booleanValue());
  }

  public String getEndpointWorkerType()
  {
    return this.endpointWorkerType;
  }

  public void setEndpointWorkerType(String endpointWorkerType)
  {
    this.endpointWorkerType = endpointWorkerType;
  }

  public ActiveMQConnectionRequestInfo getInfo()
  {
    return this.info;
  }
}